*
* This is a faster and less resource-intensive job queue than JobQueueDB.
* All data for a queue using this class is placed into one redis server.
+ * The mediawiki/services/jobrunner background service must be set up and running.
*
- * There are eight main redis keys used to track jobs:
+ * There are eight main redis keys (per queue) used to track jobs:
* - l-unclaimed : A list of job IDs used for ready unclaimed jobs
* - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries
* - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs
* entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its
* ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID.
*
+ * The following keys are used to track queue states:
+ * - s-queuesWithJobs : A set of all queues with non-abandoned jobs
+ *
+ * The background service takes care of undelaying, recycling, and pruning jobs as well as
+ * removing s-queuesWithJobs entries as queues empty.
+ *
* Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication.
* Aside from root job keys, all keys have no expiry, and are only removed when jobs are run.
* All the keys are prefixed with the relevant wiki ID information.
* @throws RedisException
*/
protected function pushBlobs( RedisConnRef $conn, array $items ) {
- $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
+ $args = array( $this->encodeQueueName() );
+ // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
foreach ( $items as $item ) {
$args[] = (string)$item['uuid'];
$args[] = (string)$item['sha1'];
}
static $script =
<<<LUA
- local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = unpack(KEYS)
- if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end
+ local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS)
+ -- First argument is the queue ID
+ local queueId = ARGV[1]
+ -- Next arguments all come in 4s (one per job)
+ local variadicArgCount = #ARGV - 1
+ if variadicArgCount % 4 ~= 0 then
+ return redis.error_reply('Unmatched arguments')
+ end
+ -- Insert each job into this queue as needed
local pushed = 0
- for i = 1,#ARGV,4 do
+ for i = 2,#ARGV,4 do
local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then
if 1*rtimestamp > 0 then
pushed = pushed + 1
end
end
+ -- Mark this queue as having jobs
+ redis.call('sAdd',kQwJobs,queueId)
return pushed
LUA;
return $conn->luaEval( $script,
$this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
$this->getQueueKey( 'z-delayed' ), # KEYS[4]
$this->getQueueKey( 'h-data' ), # KEYS[5]
+ $this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6]
),
$args
),
- 5 # number of first argument(s) that are keys
+ 6 # number of first argument(s) that are keys
);
}
static $script =
<<<LUA
local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
+ local rTime = unpack(ARGV)
-- Pop an item off the queue
local id = redis.call('rPop',kUnclaimed)
- if not id then return false end
+ if not id then
+ return false
+ end
-- Allow new duplicates of this job
local sha1 = redis.call('hGet',kSha1ById,id)
if sha1 then redis.call('hDel',kIdBySha1,sha1) end
redis.call('hDel',kSha1ById,id)
-- Mark the jobs as claimed and return it
- redis.call('zAdd',kClaimed,ARGV[1],id)
+ redis.call('zAdd',kClaimed,rTime,id)
redis.call('hIncrBy',kAttempts,id,1)
return redis.call('hGet',kData,id)
LUA;
static $script =
<<<LUA
local kClaimed, kAttempts, kData = unpack(KEYS)
+ local uuid = unpack(ARGV)
-- Unmark the job as claimed
- redis.call('zRem',kClaimed,ARGV[1])
- redis.call('hDel',kAttempts,ARGV[1])
+ redis.call('zRem',kClaimed,uuid)
+ redis.call('hDel',kAttempts,uuid)
-- Delete the job data itself
- return redis.call('hDel',kData,ARGV[1])
+ return redis.call('hDel',kData,uuid)
LUA;
$res = $conn->luaEval( $script,
array(
$keys[] = $this->getQueueKey( $prop );
}
- return ( $conn->delete( $keys ) !== false );
+ $ok = ( $conn->delete( $keys ) !== false );
+ $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() );
+
+ return $ok;
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
}
}
+ /**
+ * @return array List of (wiki,type) tuples for queues with non-abandoned jobs
+ * @throws JobQueueConnectionError
+ * @throws JobQueueError
+ */
+ public function getServerQueuesWithJobs() {
+ $queues = array();
+
+ $conn = $this->getConnection();
+ try {
+ $set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) );
+ foreach ( $set as $queue ) {
+ $queues[] = $this->decodeQueueName( $queue );
+ }
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $queues;
+ }
+
/**
* @param IJobSpecification $job
* @return array
throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
}
+ /**
+ * @return string JSON
+ */
+ private function encodeQueueName() {
+ return json_encode( array( $this->type, $this->wiki ) );
+ }
+
+ /**
+ * @param string $name JSON
+ * @return array (type, wiki)
+ */
+ private function decodeQueueName( $name ) {
+ return json_decode( $name );
+ }
+
+ /**
+ * @param string $name
+ * @return string
+ */
+ private function getGlobalKey( $name ) {
+ $parts = array( 'global', 'jobqueue', $name );
+ if ( strlen( $this->key ) ) { // namespaced queue (for testing)
+ $parts[] = $this->key;
+ }
+
+ foreach ( $parts as $part ) {
+ if ( !preg_match( '/[a-zA-Z0-9_-.]+/', $part ) ) {
+ throw new InvalidArgumentException( "Key part characters are out of range." );
+ }
+ }
+
+ return implode( ':', $parts );
+ }
+
/**
* @param string $prop
* @param string|null $type